use crate::qconsumer::{QConsumer, QMessageFilter};
use crate::qmessage::{QMessage, QMessageInfo, INVALID_MSG_ID, INVALID_SEQ};
use alloc::borrow::ToOwned;
use alloc::collections::LinkedList;
use alloc::sync::Arc;
use alloc::vec::Vec;
use embed_std::sync::mutex::Mutex;
use embed_std::thread::sleep_ms;
use embed_std::time::now;
use embed_std::{debugln, errorln, infoln, warnln};
use embed_std::{Error, Result};

const CALL_TIMEOUT: u32 = 10;

pub struct QBus {
    inner: Arc<Mutex<QBusInner>>,
}

impl Clone for QBus {
    fn clone(&self) -> Self {
        QBus {
            inner: self.inner.clone(),
        }
    }
}

impl QBus {
    pub fn new(limit: usize) -> Self {
        QBus {
            inner: Arc::new(Mutex::new(QBusInner::new(limit))),
        }
    }
    pub fn create_consumer(&self) -> QConsumer {
        QConsumer::new(self.clone())
    }

    pub fn create_consumer_with_filter(&self, filter: Box<dyn QMessageFilter>) -> QConsumer {
        QConsumer::new_with_filter(self.clone(), filter)
    }

    pub(crate) fn read(&self, consumer: &QConsumer) -> Option<QMessage> {
        let mut inner = self.inner.lock().unwrap();
        inner.read(consumer)
    }

    pub(crate) fn read_map<F>(&self, consumer: &QConsumer, f: F) -> bool
    where
        F: FnOnce(&QMessage),
    {
        let mut inner = self.inner.lock().unwrap();
        inner.read_map(consumer, f)
    }

    #[deprecated]
    pub(crate) fn read_to_buff(
        &self,
        consumer: &QConsumer,
        buff: &mut [u8],
    ) -> Option<(usize, QMessageInfo)> {
        let mut inner = self.inner.lock().unwrap();
        inner.read_to_buff(consumer, buff)
    }

    pub fn publish(&self, topic: &str, payload: &[u8]) -> Result<()> {
        let _seq = self.publish_internal(topic, payload, false)?;
        Ok(())
    }

    pub fn publish_vec(&self, topic: &str, vecs: &[&[u8]]) -> Result<()> {
        let _seq = self.publish_vec_internal(topic, vecs, false)?;
        Ok(())
    }

    pub fn call(&self, topic: &str, payload: &[u8], timeout: u32) -> Result<Vec<u8>> {
        let seq = self.publish_internal(topic, payload, true)?;
        let step = 10;
        let count = if timeout % step != 0 {
            timeout / step + 1
        } else {
            timeout / step
        };
        for _ in 0..count {
            match self.pull_resp(seq) {
                Some(data) => {
                    self.remove_message(seq);
                    return Ok(data);
                }
                None => {
                    sleep_ms(step);
                }
            }
        }
        Err(Error::Timeout)
    }

    pub fn push_resp(&self, seq: u32, data: Vec<u8>) {
        let mut inner = self.inner.lock().unwrap();
        inner.push_resp(seq, data)
    }

    fn pull_resp(&self, seq: u32) -> Option<Vec<u8>> {
        let mut inner = self.inner.lock().unwrap();
        inner.pull_resp(seq)
    }

    pub fn publish_internal(&self, topic: &str, payload: &[u8], is_req: bool) -> Result<u32> {
        let mut inner = self.inner.lock().unwrap();
        inner.publish_internal(topic, payload, is_req)
    }

    pub fn publish_vec_internal(&self, topic: &str, vecs: &[&[u8]], is_req: bool) -> Result<u32> {
        let mut inner = self.inner.lock().unwrap();
        inner.publish_vec_internal(topic, vecs, is_req)
    }

    fn remove_message(&self, seq: u32) {
        let mut inner = self.inner.lock().unwrap();
        inner.remove_message(seq)
    }

    pub fn alloc_cursor(&self) -> Option<usize> {
        let mut inner = self.inner.lock().unwrap();
        inner.alloc_cursor()
    }

    pub fn reclaim_cursor(&self, cursor_id: usize) {
        let mut inner = self.inner.lock().unwrap();
        inner.reclaim_cursor(cursor_id)
    }

    pub fn reset_cursor(&self, cursor_id: usize) {
        let mut inner = self.inner.lock().unwrap();
        inner.reset_cursor(cursor_id)
    }

    pub fn get_min_pos(&self) -> u64 {
        let inner = self.inner.lock().unwrap();
        inner.calc_min_pos()
    }

    pub fn get_cursor_pos(&self, cursor_id: usize) -> u64 {
        let inner = self.inner.lock().unwrap();
        if cursor_id < inner.cursors.len() {
            if let Some(ref cursor) = inner.cursors[cursor_id] {
                return cursor.seq;
            }
        }
        INVALID_MSG_ID
    }
}

impl Drop for QBus {
    fn drop(&mut self) {
        let count = Arc::strong_count(&self.inner);
        println!("drop QBus {}", count);
    }
}

const MAX_CONSUMER: usize = 128;
pub(crate) struct QBusInner {
    messages: LinkedList<QMessage>,
    calls: LinkedList<CallResponse>,
    cursors: [Option<QCursor>; MAX_CONSUMER],
    cur_idx: u64,
    limit: usize,
    cur_seq: u32,
}

impl QBusInner {
    pub(crate) fn new(limit: usize) -> Self {
        Self {
            messages: LinkedList::new(),
            calls: LinkedList::new(),
            cursors: [None; MAX_CONSUMER],
            cur_idx: 0,
            limit,
            cur_seq: 0,
        }
    }
    pub(crate) fn alloc_cursor(&mut self) -> Option<usize> {
        for i in 0..self.cursors.len() {
            if self.cursors[i].is_none() {
                self.cursors[i] = Some(QCursor {
                    seq: self.cur_idx,
                });
                return Some(i);
            }
        }
        None
    }

    pub(crate) fn reclaim_cursor(&mut self, cursor_id: usize) {
        if cursor_id < self.cursors.len() {
            self.cursors[cursor_id] = None;
        }
    }

    pub(crate) fn reset_cursor(&mut self, cursor_id: usize) {
        if cursor_id < self.cursors.len() {
            self.cursors[cursor_id] = Some(QCursor {
                seq: self.cur_idx,
            });
        }
    }

    pub fn read(&mut self, consumer: &QConsumer) -> Option<QMessage> {
        let mut res = None;
        self.read_map(consumer, |msg| {
            res = Some(msg.clone());
        });
        res
    }

    pub fn read_to_buff(
        &mut self,
        consumer: &QConsumer,
        buff: &mut [u8],
    ) -> Option<(usize, QMessageInfo)> {
        let mut res = None;
        self.read_map(consumer, |msg: &QMessage| {
            if msg.payload.len() > buff.len() {
                errorln!(
                    "buff {} is too small, data {}",
                    buff.len(),
                    msg.payload.len()
                );
            } else {
                (&mut buff[..msg.payload.len()]).copy_from_slice(msg.payload.as_slice());
                res = Some((
                    msg.payload.len(),
                    QMessageInfo {
                        id: msg.id,
                        topic: msg.topic.clone(),
                        req_id: msg.req_id,
                        created_time: msg.created_time,
                    },
                ));
            }
        });
        res
    }

    pub fn read_map<F>(&mut self, consumer: &QConsumer, f: F) -> bool
    where
        F: FnOnce(&QMessage),
    {
        if consumer.cursor() < self.cursors.len() {
            if let Some(ref mut cursor) = self.cursors[consumer.cursor()] {
                for msg in self.messages.iter_mut() {
                    if cursor.seq == INVALID_MSG_ID || msg.id > cursor.seq {
                        cursor.seq = msg.id;
                        if consumer.match_filter(msg) {
                            msg.consume_count += 1;
                            if (msg.req_id != INVALID_SEQ && msg.consume_count <= 1)
                                || msg.req_id == INVALID_SEQ
                            {
                                f(msg);
                                return true;
                            }
                        }
                    }
                }
            }
        }
        false
    }

    pub fn push_resp(&mut self, seq: u32, data: Vec<u8>) {
        for msg in self.calls.iter_mut() {
            if msg.seq == seq {
                msg.resp = Some(data);
                break;
            }
        }
    }

    fn pull_resp(&mut self, seq: u32) -> Option<Vec<u8>> {
        let mut found_index = -1i32;
        let mut resp = None;
        let mut index = 0i32;
        for msg in self.calls.iter_mut() {
            if msg.seq == seq && msg.resp.is_some() {
                resp = msg.resp.take();
                found_index = index;
                break;
            }
            index += 1;
        }
        if found_index >= 0 {
            self.calls.remove(found_index as usize);
        }
        resp
    }

    fn get_msg_id(&mut self) -> u64 {
        if self.cur_idx >= u64::MAX {
            self.cur_idx = 0;
        }
        self.cur_idx += 1;
        self.cur_idx
    }

    fn get_call_seq(&mut self) -> u32 {
        if self.cur_seq >= u32::MAX {
            self.cur_seq = 1;
        } else {
            self.cur_seq += 1;
        }
        self.cur_seq
    }

    pub fn publish_internal(&mut self, topic: &str, payload: &[u8], is_req: bool) -> Result<u32> {
        let mut seq = INVALID_SEQ;
        let cur = now().as_secs() as i64;
        if is_req {
            seq = self.get_call_seq();
            while !self.calls.is_empty() {
                let item = self.calls.front().unwrap();
                if cur > item.created_time + CALL_TIMEOUT as i64 || self.calls.len() > self.limit {
                    warnln!("call seq {} ts {} timeout", item.seq, item.created_time);
                    self.calls.pop_front();
                    continue;
                }
                break;
            }
            self.calls.push_back(CallResponse {
                created_time: cur,
                seq,
                resp: None,
            });
        }
        let cur_idx = self.get_msg_id();
        self.do_clean(cur);

        self.messages.push_back(QMessage {
            id: cur_idx,
            topic: topic.to_owned(),
            payload: Vec::from(payload),
            req_id: seq,
            created_time: cur,
            consume_count: 0,
        });
        let msg = self.messages.front().unwrap();
        debugln!(
            "new msg {} ts {} message len {} call len {}",
            msg.id,
            msg.created_time,
            self.messages.len(),
            self.calls.len()
        );
        Ok(seq)
    }

    pub fn publish_vec_internal(
        &mut self,
        topic: &str,
        vecs: &[&[u8]],
        is_req: bool,
    ) -> Result<u32> {
        let mut seq = INVALID_SEQ;
        let cur = now().as_secs() as i64;
        if is_req {
            seq = self.get_call_seq();

            while !self.calls.is_empty() {
                let item = self.calls.front().unwrap();
                if cur > item.created_time + CALL_TIMEOUT as i64 || self.calls.len() > self.limit {
                    warnln!("call seq {} ts {} timeout", item.seq, item.created_time);
                    self.calls.pop_front();
                    continue;
                }
                break;
            }
            self.calls.push_back(CallResponse {
                created_time: cur,
                seq,
                resp: None,
            });
        }
        let cur_idx = self.get_msg_id();
        self.do_clean(cur);

        let payload_len = vecs.iter().fold(0, |acc, item| acc + item.len());
        let mut payload = Vec::with_capacity(payload_len);
        for item in vecs {
            payload.extend_from_slice(*item);
        }
        self.messages.push_back(QMessage {
            id: cur_idx,
            topic: topic.to_owned(),
            payload,
            req_id: seq,
            created_time: cur,
            consume_count: 0,
        });
        let msg = self.messages.front().unwrap();
        debugln!(
            "new msg {} ts {} message len {} call len {}",
            msg.id,
            msg.created_time,
            self.messages.len(),
            self.calls.len()
        );
        Ok(seq)
    }

    fn do_clean(&mut self, cur: i64) {
        let min_seq = self.calc_min_pos();
        self.dump_cursors();
        while !self.messages.is_empty() {
            if self.messages.front().is_none() {
                self.messages.pop_front();
                continue;
            }
            let msg = self.messages.front().unwrap();
            if msg.id <= min_seq {
                debugln!("cur {} min pos {} remove it", msg.id, min_seq);
                self.messages.pop_front();
                continue;
            }
            if cur > msg.created_time + CALL_TIMEOUT as i64 {
                warnln!(
                    "cur {} min_seq {} msg {} ts {} consumed {} timeout, force to remove! ",
                    cur,
                    min_seq,
                    msg.id,
                    msg.created_time,
                    msg.consume_count
                );
                self.messages.pop_front();
                continue;
            }

            if self.messages.len() > self.limit {
                warnln!("force to move forward! msg {}", msg.id);
                self.messages.pop_front();
                continue;
            }
            break;
        }
    }

    pub fn calc_min_pos(&self) -> u64 {
        let mut seq = if !self.messages.is_empty() {
            self.messages.back().unwrap().id
        } else {
            self.cur_idx
        };
        for cursor in self.cursors.iter() {
            if let Some(pos) = cursor {
                if pos.seq != INVALID_MSG_ID && pos.seq < seq {
                    seq = pos.seq;
                }
            }
        }
        seq
    }

    pub fn remove_message(&mut self, seq: u32) {
        if let Some(pos) = self.messages.iter().rposition(|item| item.req_id == seq) {
            debugln!("seq {} remove position {}", seq, pos);
            self.messages.remove(pos);
        }
    }

    pub fn dump_cursors(&self) {
        for i in 0..self.cursors.len() {
            if let Some(ref cur) = self.cursors[i] {
                debugln!("cursor [{}] pos [{}]", i, cur.seq);
            }
        }
    }
}

impl Drop for QBusInner {
    fn drop(&mut self) {
        infoln!(
            "drop QBus: messages len {} call len {}",
            self.messages.len(),
            self.calls.len()
        );
    }
}

pub(crate) struct CallResponse {
    pub created_time: i64,
    pub seq: u32,
    pub resp: Option<Vec<u8>>,
}

#[derive(Copy, Clone)]
pub(crate) struct QCursor {
    pub(crate) seq: u64,
}
